package com.atguigu.networkflow.analysis;

import com.atguigu.networkflow.analysis.bean.ApacheLogEvent;
import com.atguigu.networkflow.analysis.bean.PageViewCount;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.curator.org.apache.curator.shaded.com.google.common.collect.Lists;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Map;
import java.util.regex.Pattern;

public class HotPages2 {
    public static void main(String[] args) throws Exception {
        //创建流式环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //读取流式数据
        /*URL resource = HotPages.class.getResource("/apache.log");
        DataStream<String> inputStream = env.readTextFile(resource.getPath());*/
        DataStream<String> inputStream = env.socketTextStream("localhost", 7777);

        //转换成POJO，分配时间窗口，watermark 为 1分钟
        DataStream<ApacheLogEvent> dataStream = inputStream.map(line -> {
            String[] fields = line.split(" ");
            SimpleDateFormat sdf = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss");
            Long timestamp = sdf.parse(fields[3]).getTime();
            return new ApacheLogEvent(fields[0], fields[1], timestamp, fields[5], fields[6]);
        })//.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<ApacheLogEvent>(Time.minutes(1)) {
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<ApacheLogEvent>(Time.seconds(1)) {
                    @Override
                    public long extractTimestamp(ApacheLogEvent apacheLogEvent) {
                        return apacheLogEvent.getTimestamp();
                    }
                });

        dataStream.print("data");

        //定义一个侧输出流标签
        OutputTag<ApacheLogEvent> lateTage = new OutputTag<ApacheLogEvent>("late") {
        };
        //分组开窗聚合
        SingleOutputStreamOperator<PageViewCount> windowAggStream = dataStream.filter(data -> "GET".equals(data.getMethod()))
                .filter(data -> {
                    String regex = "^((?!\\.(css|js|png|ico)$).)*$";
                    return Pattern.matches(regex, data.getUrl());
                })
                //根据url分组
                .keyBy(ApacheLogEvent::getUrl)
                .timeWindow(Time.minutes(10), Time.seconds(5))
                //允许迟到1分钟
                .allowedLateness(Time.minutes(1))
                .sideOutputLateData(lateTage)
                .aggregate(new PageCountAgg(), new PageCountResult());

        windowAggStream.print("agg");
        windowAggStream.getSideOutput(lateTage).print("late");

        //收集同一窗口count数据，排序输出
        DataStream<String> resultStream = windowAggStream
                .keyBy(PageViewCount::getWindowEnd)
                .process(new TopNHotPages(3));
        resultStream.print();
        env.execute("hot pages job");
    }

    public static class TopNHotPages extends KeyedProcessFunction<Long, PageViewCount, String> {
        private Integer topSize;

        public TopNHotPages(Integer topSize) {
            this.topSize = topSize;
        }

        //定义状态，保存所有PageViewCount到List中
        //ListState<PageViewCount> pageViewCountListState;
        MapState<String, Long> pageViewCountMapState;

        @Override
        public void open(Configuration parameters) throws Exception {
            //pageViewCountListState = getRuntimeContext().getListState(new ListStateDescriptor<PageViewCount>("page-count-list",PageViewCount.class));
            pageViewCountMapState = getRuntimeContext().getMapState(new MapStateDescriptor<String, Long>("page-count-list", String.class, Long.class));
        }

        @Override
        public void processElement(PageViewCount pageViewCount, Context context, Collector<String> out) throws Exception {
            pageViewCountMapState.put(pageViewCount.getUrl(), pageViewCount.getCount());
            // pageViewCountListState.add(pageViewCount);
            context.timerService().registerEventTimeTimer(pageViewCount.getWindowEnd() + 1);
            context.timerService().registerEventTimeTimer(pageViewCount.getWindowEnd() + 60 * 1000L);
        }

        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
            //判断是否到窗口关闭时间，如果是则进行清理
            if (timestamp == ctx.getCurrentKey() + 60 * 1000L) {
                pageViewCountMapState.clear();
                return;
            }

            ArrayList<Map.Entry<String, Long>> pageViewCounts = Lists.newArrayList(pageViewCountMapState.entries());
            //ArrayList<PageViewCount> pageViewCounts = Lists.newArrayList(pageViewCountListState.get().iterator());
            //倒序排序
            /*pageViewCounts.sort(new Comparator<PageViewCount>() {
                @Override
                public int compare(PageViewCount o1, PageViewCount o2) {
                    return Long.compare(o2.getCount(),o1.getCount());
                }
            });*/
            pageViewCounts.sort(new Comparator<Map.Entry<String, Long>>() {
                @Override
                public int compare(Map.Entry<String, Long> o1, Map.Entry<String, Long> o2) {
                    return Long.compare(o2.getValue(), o1.getValue());
                }
            });

            //排名格式化城字符串输出
            //将排名信息格式化成String,方便打印输出
            StringBuilder resultBuilder = new StringBuilder();
            resultBuilder.append("========================================");
            resultBuilder.append("\n").append("窗口结束时间:").append(new Timestamp(timestamp - 1));
            resultBuilder.append("\n");

            //遍历列表，取top N 输出
            for (int i = 0; i < Math.min(topSize, pageViewCounts.size()); i++) {
                Map.Entry<String, Long> entry = pageViewCounts.get(i);
                //PageViewCount pageViewCount = pageViewCounts.get(i);
                /*resultBuilder.append("No. ").append(i+1).append(":").append(" 请求地址 = ").append(pageViewCount.getUrl())
                        .append(" 浏览量 = ").append(pageViewCount.getCount()).append("\n");*/
                resultBuilder.append("No. ").append(i + 1).append(":").append(" 请求地址 = ").append(entry.getKey())
                        .append(" 浏览量 = ").append(entry.getValue()).append("\n");

            }

            resultBuilder.append("========================================\n\n");
            //控制输出频率
            Thread.sleep(1000L);
            out.collect(resultBuilder.toString());
            //pageViewCountListState.clear();
        }
    }

    public static class PageCountAgg implements AggregateFunction<ApacheLogEvent, Long, Long> {

        @Override
        public Long createAccumulator() {
            return 0L;
        }

        @Override
        public Long add(ApacheLogEvent apacheLogEvent, Long accumulator) {
            return accumulator + 1;
        }

        @Override
        public Long getResult(Long accumulator) {
            return accumulator;
        }

        @Override
        public Long merge(Long a, Long b) {
            return a + b;
        }
    }

    //实现自定义窗口函数
    public static class PageCountResult implements WindowFunction<Long, PageViewCount, String, TimeWindow> {
        @Override
        public void apply(String url, TimeWindow timeWindow, Iterable<Long> iterable, Collector<PageViewCount> out) throws Exception {
            out.collect(new PageViewCount(url, timeWindow.getEnd(), iterable.iterator().next()));
        }
    }
}
